I pulled the Dataset from Reddit's Archive Site, which contains a "Complete Public Reddit Comments Corpus".
Reddit post with more details of the dataset.
I'll attempt to bring the dataset into my environment, perform an ETL on the dataset, and run LDA on it to determine the topics of them.
In [2]:
# Install an init script to test boto3 import functionality to push data to an S3 bucket
dbutils.fs.put("/databricks/init/mwc/install_boto3.sh", """
#/bin/bash
pip install boto3
""", True)
In [4]:
from pyspark.sql.functions import col
import pyspark.sql.functions as F
import xml.etree.ElementTree as ET
import requests
import shutil
reddit_meta_url = 'https://ia801005.us.archive.org/19/items/2015_reddit_comments_corpus/2015_reddit_comments_corpus_files.xml'
r = requests.get(reddit_meta_url, stream=True)
if r.status_code == 200:
with open('/tmp/reddit_meta.xml', 'wb') as f:
r.raw.decode_content = True
shutil.copyfileobj(r.raw, f)
In [5]:
%sh
head -n 10 /tmp/reddit_meta.xml
In [6]:
# Load the Python XML parser to parse the filenames / URL from the metadata
import xml.etree.ElementTree as etree
with open('/tmp/reddit_meta.xml', 'r') as xml_file:
xml_tree = etree.parse(xml_file)
root = xml_tree.getroot()
# Filter out the entries that contain the name tags that are not empty and get the file size tags
file_names = [elem.attrib['name'] for elem in root.iter() if 'name' in elem.attrib]
file_sizes = [elem.text for elem in root.iter() if 'size' in elem.tag]
In [7]:
# Find the indices for the files that end with bzip2 compression
indx = []
for f in file_names:
if "bz2" in f:
indx.append(file_names.index(f))
# Parse out the names and sizes
fnames = [file_names[i] for i in indx]
fsizes = [file_sizes[i] for i in indx]
# Zip the values up to create an RDD and convert it to a DataFrame
df = sc.parallelize(zip(fnames, fsizes)).toDF(["fname", "fsize"])
In [8]:
from pyspark.sql.functions import col
display(df.select(col("fsize").alias("file size"), col("fname").alias("name")))
In [9]:
import pyspark.sql.functions as F
df_mb = df.withColumn('size_mb', df.fsize / 1000000)
display(df_mb.select(F.split(df_mb.fname, '/')[1].alias('year'), df_mb.size_mb.alias("Size in MB")))
In [10]:
# 26 data files missing since they're > 3GB in size compressed
# There is an 5GB object limit for S3, so I filtered out files over 3GB to test the multi-part upload api for S3
df_mb.filter(df_mb.size_mb >= 3000.0).count()
In [11]:
display(df_mb.filter(df_mb.size_mb < 3000.0))
In [13]:
%run "/Users/mwc@databricks.com/_helper.py"
In [14]:
(ACCESS_KEY, SECRET_KEY) = get_creds()
In [16]:
import requests
def download_file(url):
local_filename = url.split('/')[-1]
local_fq_filename = "/tmp/" + local_filename
# NOTE the stream=True parameter
r = requests.get(url, stream=True)
with open(local_fq_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=2048):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
return local_fq_filename
def upload_file_multipart(s3, bucketname, filepath, filename):
import os
import math
bkt = s3.get_bucket(bucketname)
k = bkt.new_key(filename)
mp = bkt.initiate_multipart_upload(filename)
source_size = os.stat(filepath).st_size
bytes_per_chunk = 5000*1024*1024
chunks_count = int(math.ceil(source_size / float(bytes_per_chunk)))
for i in range(chunks_count):
offset = i * bytes_per_chunk
remaining_bytes = source_size - offset
bytes = min([bytes_per_chunk, remaining_bytes])
part_num = i + 1
print "uploading part " + str(part_num) + " of " + str(chunks_count)
with open(filepath, 'r') as fp:
fp.seek(offset)
mp.upload_part_from_file(fp=fp, part_num=part_num, size=bytes)
if len(mp.get_all_parts()) == chunks_count:
mp.complete_upload()
print "upload_file done"
else:
mp.cancel_upload()
print "upload_file failed"
In [17]:
def get_files_to_s3_multipart(files):
# REF: http://codeinpython.blogspot.com/2015/08/s3-upload-large-files-to-amazon-using.html
# Import the necessary modules for the executors
import boto
import requests
import os
debug = True
bucket_name = get_bucket_name()
# Base URL to fetch the files
base_url = 'https://ia801005.us.archive.org/19/items/2015_reddit_comments_corpus/'
# Register the S3 boto3 client for AWS
s3client = boto.connect_s3(
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY)
bkt = s3client.get_bucket(bucket_name)
for r in files:
# Step 1: Get the initial file
# Build the full URL
file_url = base_url + r.fname
# Download file to local /tmp
download_file(file_url)
# Step 2: Initiate multipart upload for large files
file_name = r.fname.split('/')[-1]
file_path = "/tmp/" + file_name
if debug:
print "filepath: " + file_path
print "file_name: reddit/" + file_name
upload_file_multipart(s3client, bucket_name, file_path, "reddit/" + file_name)
# Delete local file and continue
os.remove(file_path)
In [19]:
# Print statements are seen in stderr on cluster executors
# This process takes 2079.28s to complete the download and push to S3
def get_files_to_s3_boto3(files):
# Import the necessary modules for the executors
import boto3
import requests
import os
debug = True
bucketname = get_bucket_name()
# Register the S3 boto3 client for AWS
s3client = boto3.client('s3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY)
# Base URL to fetch the files
base_url = 'https://ia801005.us.archive.org/19/items/2015_reddit_comments_corpus/'
for r in files:
# Build the full URL
file_url = base_url + r.fname
# Download file to local /tmp
download_file(file_url)
# Put file to S3 bucket
file_name = r.fname.split('/')[-1]
s3_obj_key = "reddit/" + file_name
s3client.put_object(Bucket=bucketname, Key=s3_obj_key, Body=open('/tmp/' + file_name, 'rb'))
# Delete local file and continue
os.remove("/tmp/" + file_name)
In [21]:
# Filter files < 3GB in total size to build out a DataFrame to perform the parallel ingestion
df_lt_3gb = df_mb.filter(df_mb.size_mb < 3000.0)
df_gt_3gb = df_mb.filter(df_mb.size_mb >= 3000.0)
In [22]:
# 2 Implementations to get around S3 issues. Multi-part upload used for larger objects.
df_lt_3gb.foreachPartition(get_files_to_s3)
df_gt_3gb.foreachPartition(get_files_to_s3_multipart)
In [24]:
%scala
// Calculate the size of the compressed dataset
import org.apache.spark.sql.functions._
val df = dbutils.fs.ls("/mnt/mwc/reddit").toDF()
val c = df.agg(sum("size")).collect()(0)
val p = dbutils.fs.ls("/mnt/mwc/reddit_parquet").toDF().agg(sum("size")).collect()(0)
In [26]:
# Takes 4836.81 seconds to do the ETL of
dfr = sqlContext.read.json("/mnt/mwc/reddit/")
dfr.write.parquet("/mnt/mwc/reddit_parquet")
In [27]:
dfp = sqlContext.read.parquet("/mnt/mwc/reddit_parquet")
dfp.registerTempTable("mwc_reddit")
dfp.printSchema()
In [28]:
display(dfp)
In [29]:
dfp.withColumn('year', F.year(F.from_unixtime(dfp.created_utc)))
In [31]:
# Create the new column called 'year' and we partition the new dataset.
df_p = dfp.withColumn('year', F.year(F.from_unixtime(dfp.created_utc)))
df_p.write.partitionBy('year').parquet('/mnt/mwc/reddit_year')
In [33]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
# To create the complete dataset, let's create temporary tables per year then find create a master union table
df = sc.parallelize(dbutils.fs.ls("/mnt/mwc/reddit_year")).toDF()
# Parse the year partition to get an array of years to register the tables by
years = df.select(F.regexp_extract('name', '(\d+)', 1).alias('year')).collect()
year_partitions = [x.asDict().values()[0] for x in years if x.asDict().values()[0]]
year_partitions
In [34]:
# Loop over and register a table per year
for y in year_partitions:
df = sqlContext.read.parquet("/mnt/mwc/reddit_year/year=%s" % y)
df.registerTempTable("reddit_%s" % y)
# Register the root directory for the complete dataset
df_complete = sqlContext.read.parquet("/mnt/mwc/reddit_year/").registerTempTable("reddit_all")
In [35]: